package v0

import (
	"errors"
	"fmt"
	"gitee.com/ymofen/golang/gobase"
	"gitee.com/ymofen/grouptask/log"
	"gitee.com/ymofen/panicsafe"
	"os"
	"reflect"
	"runtime"
	"sort"
	"strings"
	"sync"
	"sync/atomic"
	"time"
)

/***
* 雇佣多个工人进行干活, N个工人干一组的活, 如果干完看其他组有没有需要干活的
*   1. 添加任务时, 指定分组id, 相同的id分配到相同的分组中
*   2. 默认情况下, 保证同一个组, 同时只会被一个纤程按顺序执行, 但是不能保证是同一个纤程
    3. 可以通过配置ConfigChannelMaxWorkNum 配置一个组,可以同时几个工人执行任务

编码规则
    1. 减少依赖,作为核心工具存在
       1> 不使用utils中的日志, 方便utils中的日志使用该库, 如果需要使用log, 使用自带的sdk自带的log


*/

type GroupTask struct {
	isTerminated      int32
	pushcnt           int32
	worker_num        int32 // 当前工作纤程数量
	taskcnt           int32 // 当前任务数量
	popcnt            int32
	pushfailcnt       int32
	free_channel_flag int32

	started         int32
	startSId        int32
	channel_obj_cnt int32 // 数量

	runlk   Mutex
	timeout time.Duration

	taskPool *TaskPool

	Id string

	FixedWorker byte

	// 0: 马上移除, 可能会导致移除失误
	// 1: 手动清理
	config_remove_flag int

	config_grow_work_if_task_num int32 //   任务堆积超过多少时 增长worker  <=0,不增长
	config_max_work_num          int32 // 最多工作纤程数(这些工作纤程, 轮询执行任务)
	config_min_work_num          int32 // 最少保存的工作纤程数据

	closeWg sync.WaitGroup

	channelLk            RWMutex
	channelSize          int32 // 组数量
	channelMap           map[interface{}]*groupTaskChannel
	channelPool          sync.Pool             // Pool可以使用,但是用来做队列不行
	channelMaxWorkNumMap map[interface{}]int32 // groupid:max-work-num 设置通道最大纤程数时使用, 如果创建通道时会获取

	postTaskChan              chan *groupTaskChannel
	latestcleanupmsg          string
	channel_max_queue_size    int32
	free_channel_timeout_secs float64 // 如果为0不进行清理
	last_free_channel_t       time.Time
	warnmsg                   string

	OnFatalCallBack func(sender *GroupTask, rec *groupTaskRec, err error)
}

var (
	defaultWorkers   *GroupTask = nil
	defaultWorkersLk RWMutex
	startedSId       atomic.Int32
)

var (
	checkQueueDuration = time.Millisecond * 200
)

func SetCheckQueueDuration(dura time.Duration) {
	checkQueueDuration = dura
}

func CheckInitialDefaultWorkers(minwork, maxwork int32, quemax int32) bool {
	if defaultWorkers == nil {
		defaultWorkersLk.Lock()
		defer defaultWorkersLk.Unlock()
		if defaultWorkers == nil {
			defaultWorkers = NewGroupTask()
			defaultWorkers.ConfigMaxWork(maxwork).ConfigMinWork(minwork)
			defaultWorkers.ConfigChannelMaxQueueSize(quemax)
			defaultWorkers.Start()
		}
	}
	return true
}

func checkDefaultWorkers() bool {
	if defaultWorkers == nil {
		defaultWorkersLk.Lock()
		defer defaultWorkersLk.Unlock()
		if defaultWorkers == nil {
			defaultWorkers = NewGroupTask()
			defaultWorkers.ConfigMaxWork(128).ConfigMinWork(20)
			defaultWorkers.ConfigChannelMaxQueueSize(8192)
			defaultWorkers.Start()
		}
	}
	return defaultWorkers != nil
}

func DefaultGroupTask() *GroupTask {
	if !checkDefaultWorkers() {
		panic("DefaultWorkers创建失败!!!")
	}
	return defaultWorkers
}

func NewGroupTaskEx(minWork, maxWork, maxQueueSize int) *GroupTask {
	rval := &GroupTask{
		worker_num:                   0,
		taskcnt:                      0,
		channel_max_queue_size:       int32(maxQueueSize),
		free_channel_timeout_secs:    120, // 清理间隔, 清理超时设定; 通道N秒没有交互将会被清理,
		config_min_work_num:          int32(minWork),
		config_max_work_num:          int32(maxWork),
		config_grow_work_if_task_num: 5,
		config_remove_flag:           1,
		channelMap:                   make(map[interface{}]*groupTaskChannel),
		channelMaxWorkNumMap:         make(map[interface{}]int32),
		timeout:                      time.Second,
	}
	rval.channelPool.New = rval.onNewChannel
	rval.Start()
	return rval
}

func NewGroupTask() *GroupTask {
	rval := &GroupTask{
		worker_num:                   0,
		taskcnt:                      0,
		channel_max_queue_size:       4096,
		free_channel_timeout_secs:    120, // 清理间隔, 清理超时设定; 通道N秒没有交互将会被清理,
		config_min_work_num:          10,
		config_max_work_num:          128,
		config_grow_work_if_task_num: 5,
		config_remove_flag:           1,
		channelMap:                   make(map[interface{}]*groupTaskChannel),
		channelMaxWorkNumMap:         make(map[interface{}]int32),
		timeout:                      time.Second,
	}
	rval.channelPool.New = rval.onNewChannel
	return rval
}

func (this *GroupTask) IsTerminated() bool {
	return this.isTerminated == 1
}

func (this *GroupTask) onNewChannel() interface{} {
	val := &groupTaskChannel{
		currentWorkCnt: 0,
		workMaxCnt:     1,
		max_queue_size: this.channel_max_queue_size,
		dataQueue:      NewSyncQueue(),
	}
	atomic.AddInt32(&this.channel_obj_cnt, 1)
	runtime.SetFinalizer(val, func(obj interface{}) {
		atomic.AddInt32(&this.channel_obj_cnt, -1)
	})
	return val
}

func (this *GroupTask) releaseChannel(val *groupTaskChannel) {
	quesize := atomic.LoadInt32(&val.queue_size)
	if val.currentWorkCnt != 0 || quesize != 0 {
		fmt.Fprintf(os.Stderr, "[BUG][%s]releaseChannel[%s] err: task_current_work_cnt(%d), task-size:%d", gobase.NowString(), val.IDText(), val.currentWorkCnt, quesize)
	}
	this.channelPool.Put(val)
}

func (this *GroupTask) ConfigPostTimeout(v time.Duration) {
	this.timeout = v
}

func (this *GroupTask) innerStart(beginFn func()) {
	if panicsafe.GoFunCatchException {
		defer panicsafe.DeferCatchPanic()
	}

	sid := this.startSId
	doneFlag := 0
	var donefn = func() {
		if doneFlag == 1 {
			return
		}

		doneFlag = 1
		// fmt.Printf("grouptask done begin\tsid=%d", sid)
		log.Debugf("grouptask done begin\tsid=%d", sid)
		this.closeWg.Done()
		log.Debugf("grouptask done\tsid=%d", sid)
	}
	defer donefn()

	{

		log.Debugf("grouptask start\tsid=%d", sid)
		taskChan := make(chan *groupTaskChannel, this.channel_max_queue_size)
		this.postTaskChan = taskChan
		taskPool := NewTaskPool(int(this.channel_max_queue_size))
		taskPool.WorkMin = this.config_min_work_num
		taskPool.WorkMax = this.config_max_work_num
		taskPool.RunMinWorkers()
		this.taskPool = taskPool
		defer taskPool.Close()

		runfn := func(ch *groupTaskChannel) {
			if !taskPool.PostTask(func(worker *TaskWorker, args ...interface{}) {
				if panicsafe.GoFunCatchException {
					defer panicsafe.DeferCatchPanic()
				}
				defer ch.endWork()
				atomic.AddInt32(&this.worker_num, 1)
				// log.Debugf("grouptask groupTaskChannel start\tsid=%d\tch=%v", this.startSId, ch.id)
				this.closeWg.Add(1)
				defer func() {
					// log.Debugf("grouptask groupTaskChannel done\tsid=%d\tch=%v", this.startSId, ch.id)
					this.closeWg.Done()
					atomic.AddInt32(&this.worker_num, -1)
				}()

				ch := args[0].(*groupTaskChannel)
				ch.lastExecStartT = time.Now()
				ch.lastGorouteId = worker.GoID
				this.doChannelWork(ch)
			}, ch) {
				// 任务指向失败
				this.pushfailcnt++
			}
		}

		//n := 0
		idleTick := time.NewTicker(time.Second * 60)
		beginFn()
	break_for:
		for {
			select {
			case ch := <-taskChan:
				if ch == nil {
					log.Debugf("grouptask recv stop sign\tsid=%d", this.startSId)
					break break_for
				}

				if ch.tryBeginWork() { // 尝试锁定
					runfn(ch)
					//n = 0
				} else {
					//n++
					//taskPool.checkRunWorker()
				}
			case <-idleTick.C:
				this.doIdle()
			}
		}

		log.Debugf("grouptask innerStart end\tsid=%d", this.startSId)
	}
	// donefn()
}

func (this *GroupTask) checkStart() {
	this.runlk.Lock()
	defer this.runlk.Unlock()
	if this.started == 1 {
		log.Debugf("grouptask start, current status 1\tsid=%d", this.startSId)
		return
	}
	this.started = 1
	this.isTerminated = 0
	this.closeWg.Add(1)
	// 等待启动好
	var wg sync.WaitGroup
	wg.Add(1)
	sid := startedSId.Add(1)
	this.startSId = sid
	go this.innerStart(func() {
		wg.Done()
	})
	wg.Wait()
}

/**
 * 可以重复执行g
 */
func (this *GroupTask) Start() {
	this.checkStart()
}

func (this *GroupTask) DetailsEx2(all bool, max int, searchval, exclusive string) string {
	var sb strings.Builder
	i := 0
	lst := make([]*groupTaskChannel, 0)
	this.channelLk.RLock()
	defer this.channelLk.RUnlock()
	for _, itm := range this.channelMap {
		if len(exclusive) > 0 {
			if strings.Contains(itm.IDText(), exclusive) { // 排除
				continue
			}
		}

		matched := false

		if len(searchval) > 0 {
			if strings.Contains(itm.IDText(), searchval) { // 搜索
				matched = true
			} else {
				continue
			}
		}

		if all || itm.queue_size > 0 || matched || itm.task_current_busy_cnt > 0 {
			//itm.tagstr = fmt.Sprintf("%d", itm.GetBusyDuration())
			lst = append(lst, itm)
			i++
		}
	}

	sort.Slice(lst, func(i, j int) bool {
		itmi := lst[i]
		itmj := lst[j]

		//// 堆积比较严重
		//if (itmi.queue_size > 1000) != (itmj.queue_size > 1000) {
		//	return itmi.queue_size > 1000
		//}
		//if (itmi.queue_size > 100) != (itmj.queue_size > 100) {
		//	return itmi.queue_size > 100
		//}
		//

		//return StrToInt64Def(itmi.tagstr, 0) < StrToInt64Def(itmj.tagstr, 0)

		dura0 := itmi.GetBusyDuration()
		dura1 := itmj.GetBusyDuration()
		if dura0 != dura1 {
			return dura0 > dura1
		}

		if (itmi.queue_size > 0) != (itmj.queue_size > 0) {
			return itmi.queue_size > 0
		}

		//
		if (itmi.push_err_n > 0) != (itmj.push_err_n > 0) {
			return itmi.push_err_n > 0
		}

		if (itmi.lastPushFailTask != nil) != (itmj.lastPushFailTask != nil) {
			return itmi.lastPushFailTask != nil
		}

		if itmi.queue_size != itmj.queue_size {
			return itmi.queue_size > itmj.queue_size
		}

		return itmi.IDText() < itmj.IDText()

		// return itmi.queue_size > itmj.queue_size;
		//return itmi.last_push_T.After(itmj.last_push_T)
	})

	for i := 0; i < len(lst); i++ {
		itm := lst[i]
		if sb.Len() > 0 {
			sb.WriteString("\r\n")
		}
		sb.WriteString(fmt.Sprintf("%d\r\n", i+1))
		sb.WriteString(itm.StatusString())
		if max > 0 && i >= max {
			break
		}
	}

	return sb.String()
}

func (this *GroupTask) CheckAbnormal() (err error) {
	this.channelLk.RLock()
	defer this.channelLk.RUnlock()
	for _, itm := range this.channelMap {
		if itm.IsAbnormal() {
			err = fmt.Errorf("[%s]执行超时(%d)ms", itm.IDText(), itm.GetBusyDuration().Milliseconds())
			return err
		}
	}
	return nil
}

func (this *GroupTask) DetailsEx(all bool, max int) string {
	return this.DetailsEx2(all, max, "", "")
}

func (this *GroupTask) Details(all bool) string {
	return this.DetailsEx(all, 0)
}

/*
设置通道的最大工作纤程数量
 1. 如果设置成大于1或者0, 会有多线程同时执行通道内任务
*/
func (this *GroupTask) ConfigChannelMaxWorkNum(groupid interface{}, maxWorkNum int32) *GroupTask {
	this.channelLk.Lock()
	defer this.channelLk.Unlock()
	this.channelMaxWorkNumMap[groupid] = maxWorkNum
	itm := this.channelMap[groupid]
	if itm != nil {
		itm.workMaxCnt = maxWorkNum
	}
	return this
}

func (this *GroupTask) ConfigMinWork(num int32) *GroupTask {
	this.config_min_work_num = num
	return this
}

func (this *GroupTask) ConfigGrowIfTaskPiled(num int32) *GroupTask {
	this.config_grow_work_if_task_num = num
	return this
}

func (this *GroupTask) ConfigMaxWork(num int32) *GroupTask {
	this.runlk.Lock()
	defer this.runlk.Unlock()
	if this.started == 1 {
		log.Warnf("grouptask ConfigMaxWork, currrent status is started\tsid=%d", this.startSId)
		return this
	}
	this.config_max_work_num = num
	return this
}

func (this *GroupTask) ConfigChannelMaxQueueSize(queue_size int32) *GroupTask {
	this.runlk.Lock()
	defer this.runlk.Unlock()
	if this.started == 1 {
		log.Warnf("grouptask config, currrent status is started\tsid=%d", this.startSId)
		return this
	}
	this.channel_max_queue_size = queue_size
	return this
}

func (this *GroupTask) GetChannelMaxQueueSize() int32 {
	return this.channel_max_queue_size
}

func (this *GroupTask) TaskCnt() int32 {
	return this.taskcnt
}

func (this *GroupTask) Terminate() {
	this.isTerminated = 1
}

func (this *GroupTask) Stop() {
	this.runlk.Lock()
	if this.started == 0 {
		log.Debugf("grouptask stop. current status is shutdown\t")
		this.runlk.Unlock()
		return
	}
	log.Debugf("grouptask will stop\tsid=%d", this.startSId)
	this.isTerminated = 1
	flag := 0
	taskChan := this.postTaskChan
	if taskChan != nil {
		select {
		case taskChan <- nil:
			log.Debugf("grouptask push stop signal to taskChan\tsid=%d", this.startSId)
		case <-time.After(time.Second):
			flag = -1
		}
		if flag == -1 {
			log.Error("err stop push")
		}
	}
	this.closeWg.Wait()
	this.started = 0
	this.runlk.Unlock()
}

func (this *GroupTask) GroupStatus(groupid interface{}) string {
	this.channelLk.RLock()
	defer this.channelLk.RUnlock()
	itm := this.channelMap[groupid]
	if itm == nil {
		return "NULL"
	}
	return itm.StatusString()
}

func (this *GroupTask) GroupSimpleStatus(groupid interface{}) string {
	this.channelLk.RLock()
	defer this.channelLk.RUnlock()
	itm := this.channelMap[groupid]
	if itm == nil {
		return "NULL"
	}
	return fmt.Sprintf("work: %d/%d(max), task(remain/fail/total):%d/%d/%d, goid:%d(%d ms)",
		itm.currentWorkCnt, itm.workMaxCnt, itm.queue_size, itm.push_err_n, itm.push_cnt, itm.lastGorouteId, itm.GetBusyDuration().Milliseconds())
}

func (this *GroupTask) StatusSimpleString() string {
	var sb strings.Builder
	sb.WriteString(fmt.Sprintf("group:%d", this.channelSize))

	sb.WriteString(fmt.Sprintf(", workers[2]:%s", this.taskPool.Status()))
	sb.WriteString(fmt.Sprintf(", task:%d", this.taskcnt))
	sb.WriteString(fmt.Sprintf(", cmdpost-chan-size:%d/%d", len(this.postTaskChan), cap(this.postTaskChan)))

	sb.WriteString(fmt.Sprintf(", pushfail:%d", this.pushfailcnt))

	return sb.String()
}

func (this *GroupTask) StatusString() string {
	var sb strings.Builder
	sb.WriteString("状态:")
	if this.isTerminated == 0 {
		sb.WriteString("开启\r\n")
	} else {
		sb.WriteString("停止\r\n")
	}
	sb.WriteString(fmt.Sprintf("组数量:%d\r\n", this.channelSize))

	taskPool := this.taskPool
	if taskPool != nil {
		sb.WriteString(fmt.Sprintf("线程池: %s\r\n", taskPool.Status()))
	}
	sb.WriteString(fmt.Sprintf("任务投递队列: %d/%d\r\n", len(this.postTaskChan), cap(this.postTaskChan)))

	sb.WriteString(fmt.Sprintf("当前任务:%d\r\n", this.taskcnt))
	sb.WriteString(fmt.Sprintf("任务计数: pushfail:%d\r\n", this.pushfailcnt))
	sb.WriteString(fmt.Sprintf("通道对象存活数:%d %s\r\n", this.channel_obj_cnt, this.latestcleanupmsg))

	if len(this.warnmsg) > 0 {
		sb.WriteString(this.warnmsg)
		sb.WriteString("\r\n")
	}

	sb.WriteString("尚未执行完成任务组(最多罗列10组):\r\n")
	sb.WriteString(this.DetailsEx(false, 10))
	return sb.String()
}

func (this *GroupTask) cleanup(secs float64) {
	this.channelLk.Lock()
	defer this.channelLk.Unlock()
	n := 0
	for _, channel := range this.channelMap {
		if channel.checkTryCleanup(secs) {
			channel.closeflag = 1
			delete(this.channelMap, channel.id)
			this.channelSize--
			channel.id = nil
			this.releaseChannel(channel)
			n++
		}
	}
	if n > 0 {
		this.latestcleanupmsg = fmt.Sprintf("[%s]清理[%d]没有使用的通道", gobase.NowString(), n)
	}
}

func (this *GroupTask) onTaskFuncArgs(sender *groupTaskRec) {
	cbFun := sender.paramFunc.(func(args ...interface{}))
	if cbFun != nil {
		cbFun(sender.Args...)
	}

}

func (this *GroupTask) onTaskFuncNoArgs(sender *groupTaskRec) {
	cbFun := sender.paramFunc.(func())
	if cbFun != nil {
		cbFun()
	}
}

/***
 * 可以用做同步执行
 */
func (this *GroupTask) PostTaskFuncArgs(groupid interface{}, cb func(args ...interface{}), args ...interface{}) error {
	rec := &groupTaskRec{
		Args:      args,
		paramFunc: cb,
		Cb:        this.onTaskFuncArgs,
	}
	return this.postTask(groupid, rec)
}

type waitrec struct {
	doflag   int32
	complete chan byte
}

func (this *GroupTask) WaitExecFunc(sender *groupTaskRec) {
	cbFun := sender.paramFunc.(func())
	waitRec := sender.arg.(*waitrec)
	if atomic.CompareAndSwapInt32(&waitRec.doflag, 0, 1) { // 执行
		if cbFun != nil {
			cbFun()
		}
		close(waitRec.complete) // 改成close 不会阻塞
		//waitRec.complete <- 1 // 只有争取到, 才需要推入执行完成标记
	}
}

/*
参数:

	timeout:表示等待执行的时间， 如果到时尚未执行, 则函数不会再执行, 如果到超时时间已经在执行，则会等待执行完成

返回:

	ok: true: 表示执行成功, false:执行超时
	err: 投递任务失败, wait timeout
*/
func (this *GroupTask) PostTaskFuncAndWait(groupid interface{}, timeout time.Duration, cb func()) (ok bool, err error) {
	waitRec := &waitrec{doflag: 0, complete: make(chan byte)}
	rec := &groupTaskRec{
		arg:       waitRec,
		paramFunc: cb,
		Cb:        this.WaitExecFunc,
	}
	err = this.postTask(groupid, rec)

	if err != nil {
		return
	}

	timer := time.NewTimer(timeout)
	select {
	case <-waitRec.complete:
		if !timer.Stop() {
			<-timer.C
		}
		return true, nil
	case <-timer.C:
		if atomic.CompareAndSwapInt32(&waitRec.doflag, 0, 2) { // doflag 设为失败
			//return false, fmt.Errorf("task post wait timeout!")
			return false, nil
		} else { // 设置失败, 等待执行完成
			<-waitRec.complete
			return true, nil
		}
	}
}

func (this *GroupTask) PostTaskFunc(groupid interface{}, cb func()) error {
	rec := &groupTaskRec{
		Args:      nil,
		paramFunc: cb,
		Cb:        this.onTaskFuncNoArgs,
	}
	return this.postTask(groupid, rec)
}

/*
需要调用者确保groupChannel可用 没有被移除掉
*/
func (this *GroupTask) PostChannelTaskFunc(groupChanel interface{}, cb func(args ...interface{}), args ...interface{}) error {
	rec := &groupTaskRec{
		Args:      args,
		paramFunc: cb,
		Cb:        this.onTaskFuncArgs,
	}
	channel := groupChanel.(*groupTaskChannel)
	rec.groupID = channel.id
	return this.innerPostGroupTask(channel, rec)
}

func (this *GroupTask) CheckCreateFixedGroupChannel(groupid interface{}) *groupTaskChannel {
	rval := this.checkCreateGroup(groupid)
	rval.fixed = 1
	return rval
}

func (this *GroupTask) RemoveFixedGroupChannel(groupid interface{}) bool {
	this.channelLk.Lock()
	defer this.channelLk.Unlock()
	rval := this.channelMap[groupid]
	if rval == nil {
		if !panicsafe.GoFunCatchException { // 调试模式, 抛出异常
			panic(fmt.Sprintf("%s channel is null", groupid))
		}
		return false
	}
	delete(this.channelMap, groupid)
	return true
}

func (this *GroupTask) PostTaskArgs(groupid interface{}, cb func(sender *groupTaskRec), args ...interface{}) error {
	rec := &groupTaskRec{
		Args: args,
		Cb:   cb,
	}
	return this.postTask(groupid, rec)
}

func (this *GroupTask) PostTaskArgsEx(groupid interface{}, cb func(args ...interface{}), args ...interface{}) error {
	rec := &groupTaskRec{
		Args: args,
		Cb: func(sender *groupTaskRec) {
			cb(sender.Args...)
		},
	}
	return this.postTask(groupid, rec)
}

func (this *GroupTask) checkGetGroup(groupid interface{}) *groupTaskChannel {
	this.channelLk.RLock()
	defer this.channelLk.RUnlock()
	return this.channelMap[groupid]

}

func (this *GroupTask) checkCreateGroup(groupid interface{}) *groupTaskChannel {
	// 先尝试获取, 这样如果不是经常更改的channel 不会经常进入抢占锁
	channel := this.checkGetGroup(groupid)
	if channel != nil {
		return channel
	}
	this.channelLk.Lock()
	defer this.channelLk.Unlock()
	channel = this.channelMap[groupid]
	if channel == nil {
		channel = this.channelPool.Get().(*groupTaskChannel)
		channel.reset()
		channel.id = groupid
		channel.lastActivity = time.Now()
		if n, ok := this.channelMaxWorkNumMap[groupid]; ok {
			channel.workMaxCnt = n
		}

		this.channelMap[groupid] = channel
		this.channelSize++
	}
	return channel
}

func (this *GroupTask) innerPostGroupTask(groupChannel *groupTaskChannel, rec *groupTaskRec) error {
	if this.isTerminated == 1 {
		return errors.New("[停止工作]不能压入任务")
	}
	taskChan := this.postTaskChan
	if this.started == 0 || taskChan == nil {
		return errors.New("[停止工作]不能压入任务")
	}

	atomic.AddInt32(&this.pushcnt, 1)
	atomic.AddInt32(&this.taskcnt, 1)
	rec.t0 = time.Now()
	err := groupChannel.push(rec)
	if err != nil {
		// 压入失败 -1
		atomic.AddInt32(&this.taskcnt, -1)
		atomic.AddInt32(&this.pushfailcnt, 1)
		return err
	}
	select {
	case taskChan <- groupChannel:
	case <-time.After(this.timeout):
		atomic.AddInt32(&this.taskcnt, -1)
		atomic.AddInt32(&this.pushfailcnt, 1)
		return err
	}
	return nil

}

func (this *GroupTask) postTask(groupid interface{}, rec *groupTaskRec) error {
	if this.isTerminated == 1 {
		return fmt.Errorf("grouptask push, current status is terminated\tsid=%d", this.startSId)
	}
	rec.groupID = groupid
	realChannel := this.checkCreateGroup(groupid)
	return this.innerPostGroupTask(realChannel, rec)
}

func (this *GroupTask) doIdle() {
	if this.free_channel_timeout_secs > 0 { // 尝试进行清理
		if atomic.CompareAndSwapInt32(&this.free_channel_flag, 0, 1) {
			if this.last_free_channel_t.IsZero() || time.Now().Sub(this.last_free_channel_t).Seconds() > this.free_channel_timeout_secs {
				this.cleanup(this.free_channel_timeout_secs)
				this.last_free_channel_t = time.Now()
			}
			this.free_channel_flag = 0
		}
	}
}

/**
 * 如果channel 运行多个纤程工作, 通一个channel会有多纤程同时执行
 */
func (this *GroupTask) doChannelWork(channel *groupTaskChannel) {
	atomic.AddInt32(&channel.task_current_busy_cnt, 1)
	defer func() {
		atomic.AddInt32(&channel.task_current_busy_cnt, -1)
	}()
	for {
		rec := channel.pop()
		if rec == nil {
			break
		}
		atomic.AddInt32(&this.taskcnt, -1)
		atomic.AddInt32(&this.popcnt, 1)

		if this.isTerminated == 1 {
			rec.err = errors.New("[工作停止]请勿执行耗时操作")
		}

		if checkQueueDuration > 0 {
			dura := time.Since(rec.t0)
			if dura > checkQueueDuration {
				log.Warnf("The time from submission to execution is %dms.\tgroupid=%v", dura.Milliseconds(), rec.groupID)
			}
		}

		rec.CallBack()
	}
}

type groupTaskRec struct {
	groupID interface{} // 传入时写入一次
	err     error       // 执行者 通知Error

	arg interface{}
	t0  time.Time

	// 传入参数
	Args      []interface{}
	paramFunc interface{}
	Cb        func(sender *groupTaskRec)
	CallStack string
}

func (this *groupTaskRec) GetGroupID() interface{} {
	return this.groupID
}

func (this *groupTaskRec) GetError() error {
	return this.err
}

func (this *groupTaskRec) CallBack() {
	if panicsafe.GoFunCatchException {
		defer panicsafe.DeferCatchPanic()
	}
	this.Cb(this)
	this.Close()
}

func (this *groupTaskRec) Close() {
	this.CallStack = ""
	this.groupID = nil
	this.Args = nil
	this.Cb = nil
	this.err = nil
	this.paramFunc = nil
}

/*
**

	如果使用chan 做纤程进入计数,
	  不能中途改变 允许进入纤程大小
*/
type groupTaskChannel struct {
	closeflag             int32
	task_current_busy_cnt int32
	currentWorkCnt        int32  // 当前正在工作的纤程数
	workMaxCnt            int32  // 最大允许的工作纤程数量
	push_cnt              uint32 // 压入次数
	queue_size            int32  // 当前任务数量
	max_queue_size        int32  // 最大的运行队列数量
	push_err_n            int32  // 压入失败数量

	fixed     byte // 0: 不固定, 长时间不用会被移除, 1:固定, 需要手动移除
	id        interface{}
	dataQueue *SyncQueue // 任务队列

	lastActivity   time.Time // 最后活动时间
	lastPushT      time.Time
	lastPopT       time.Time
	lastExecStartT time.Time

	lastGorouteId uint64

	// 最后压入的任务
	lastPushTask *groupTaskRec

	// 最后压入失败的任务
	lastPushFailTask *groupTaskRec

	tagstr string
}

func (this *groupTaskChannel) reset() {
	this.closeflag = 0
	this.workMaxCnt = 1 // 默认为1
	this.currentWorkCnt = 0
	this.push_cnt = 0
	this.queue_size = 0
	this.push_err_n = 0
	this.id = nil
}

func (this *groupTaskChannel) push(rec *groupTaskRec) error {
	if this.closeflag != 0 {
		atomic.AddInt32(&this.push_err_n, 1)
		return errors.New(fmt.Sprintf("队列[%v]被标记为不可用,请重新进行压入！", this.id))
	}

	r := atomic.AddInt32(&this.queue_size, 1)
	if r > this.max_queue_size { // 失败
		atomic.AddInt32(&this.queue_size, -1)
		atomic.AddInt32(&this.push_err_n, 1)
		this.lastPushFailTask = rec
		return errors.New(fmt.Sprintf("队列[%v]超过堆积数量[%d]！", this.id, this.max_queue_size))
	}
	atomic.AddUint32(&this.push_cnt, 1)
	this.dataQueue.Push(rec)
	this.lastPushT = time.Now()
	this.lastPushTask = rec
	return nil
}

func (this *groupTaskChannel) pop() *groupTaskRec {
	ok, rval := this.dataQueue.Pop()
	if ok {
		atomic.AddInt32(&this.queue_size, -1)
		this.lastPopT = time.Now()
		return rval.(*groupTaskRec)
	} else {
		return nil
	}
}

func (this *groupTaskChannel) String() string {
	var sb strings.Builder
	sb.WriteString("{")
	strid := gobase.LeftStr(fmt.Sprintf("%v", this.id), 32)
	if len(strid) > 0 {
		sb.WriteString(fmt.Sprintf(`"id":"%s",`, strid))
	}
	sb.WriteString(fmt.Sprintf(`"current-work-n":"%d/%d",`, this.currentWorkCnt, this.workMaxCnt))
	sb.WriteString(fmt.Sprintf(`"remain":%d,`, atomic.LoadInt32(&this.queue_size)))
	sb.WriteString(fmt.Sprintf(`"push-err-n":%d`, atomic.LoadInt32(&this.push_err_n)))
	sb.WriteString("}")
	return sb.String()
}

func (this *groupTaskChannel) GetBusyDuration() time.Duration {
	busycnt := atomic.LoadInt32(&this.task_current_busy_cnt)
	if busycnt == 0 {
		return 0
	}
	return time.Now().Sub(this.lastExecStartT)
}

func (this *groupTaskChannel) IDText() string {
	if this.id == nil {
		return "NIL"
	} else {
		switch vv := this.id.(type) {
		case string:
			return gobase.LeftStr(vv, 64)
		case int, int8, int32, int64, uint, uint32, uint8:
			return fmt.Sprintf("%d", vv)
		case float64, float32:
			return fmt.Sprintf("%v", vv)
		default:
			return gobase.LeftStr(fmt.Sprintf("%s:%v", reflect.TypeOf(this.id).String(), this.id), 64)
		}
	}
}

func (this *groupTaskChannel) IsAbnormal() bool {
	return this.GetBusyDuration().Seconds() > 10
}

func (this *groupTaskChannel) StatusString() string {
	var sb strings.Builder
	sb.WriteString(fmt.Sprintf("id: %s\n", this.IDText()))
	if this.lastGorouteId != 0 {
		sb.WriteString(fmt.Sprintf("lastgoid:%d\n", this.lastGorouteId))
	}
	sb.WriteString(fmt.Sprintf("last_exec_start_t: %s (%d ms)\n", gobase.DateTimeString2(this.lastExecStartT), this.GetBusyDuration().Milliseconds()))
	sb.WriteString(fmt.Sprintf("last_activity:%s\n", gobase.DateTimeString2(this.lastActivity)))
	sb.WriteString(fmt.Sprintf("work: [%d], %d/%d(max)\n", this.task_current_busy_cnt, this.currentWorkCnt, this.workMaxCnt))
	sb.WriteString(fmt.Sprintf("task: %d (remain), %d(push fail)\n", this.queue_size, this.push_err_n))
	sb.WriteString(fmt.Sprintf("tagstr:%s\n", this.tagstr))
	t1 := this.lastPushFailTask
	if t1 != nil {
		sb.WriteString(fmt.Sprintf("last_push_fail_callstack:%s\n", t1.CallStack))
	}

	t2 := this.lastPushTask
	if t2 != nil && len(t2.CallStack) > 0 {
		sb.WriteString(fmt.Sprintf("last_push_callstack:%s\n", t2.CallStack))
	}

	return sb.String()
}

func (this *groupTaskChannel) needWorker() bool {
	if this.currentWorkCnt >= this.workMaxCnt {
		return false
	}

	if this.queue_size == 0 {
		return false
	}

	return true
}

/*
**
  - 开始工作
  - 2. 开启工作后, Push可以正常加入
*/
func (this *groupTaskChannel) tryBeginWork() bool {
	if atomic.LoadInt32(&this.queue_size) == 0 { // 不需要工作
		return false
	}

	if this.workMaxCnt == 0 { // 没有限制工作纤程的进入
		atomic.AddInt32(&this.currentWorkCnt, 1)
		return true
	}

	if atomic.LoadInt32(&this.currentWorkCnt) >= this.workMaxCnt { // 工作纤程已经达到了最大值
		return false
	}

	r := atomic.AddInt32(&this.currentWorkCnt, 1)
	if r <= this.workMaxCnt {
		return true
	} else {
		atomic.AddInt32(&this.currentWorkCnt, -1)
		return false
	}
}

/**
 * 关闭工作
 */
func (this *groupTaskChannel) endWork() bool {
	r := atomic.AddInt32(&this.currentWorkCnt, -1)
	if r == 0 {
		this.lastActivity = time.Now()
	}

	if r < 0 {
		fmt.Fprintf(os.Stderr, "[BUG][%v]endWork cnt err:%d, (%d)\r\n", this.id, r, this.currentWorkCnt)
	}

	return true
}

/***
 * true  表示可以进行清理
 * false 表示不可以进行清理,或者已经被清理
 */
func (this *groupTaskChannel) checkTryCleanup(secs float64) bool {

	if this.fixed == 1 {
		return false
	}

	if this.queue_size > 0 {
		return false
	}

	if this.currentWorkCnt >= 1 { // 有工作纤程进入
		return false
	}

	if this.lastActivity.IsZero() {
		return false
	}

	if time.Now().Sub(this.lastActivity).Seconds() > secs {
		return true
	} else {
		return false
	}
}
